Перейти к основному содержимому

8.05. RabbitMQ

Разработчику Архитектору Инженеру

RabbitMQ

Что такое RabbitMQ?

RabbitMQ — это популярный брокер сообщений (Message Broker), который реализует протокол AMQP. Он используется для асинхронного обмена данными между приложениями через очереди. RabbitMQ обеспечивает надёжную доставку сообщений, масштабируемость и гибкость.

Официальный сайт - https://www.rabbitmq.com/

Сообщения хранятся в очередях до тех пор, пока не будут обработаны. Производитель отправляет сообщение в очередь, а потребитель забирает его позже.

image-11.png


Производитель-потребитель

RabbitMQ использует модель «производитель-потребитель» с промежуточным хранилищем — очередью:

  • Producer (Производитель) отправляет сообщения в RabbitMQ, может быть любым приложением или системой;
  • Exchange (Обменник) принимает сообщения от производителя и определяет, в какую очередь поместить сообщение, исходя из правил маршрутизации;
  • Queue (Очередь) хранит сообщения до тех пор, пока они не будут обработаны потребителем.
  • Consumer (Потребитель) забирает сообщения из очереди и обрабатывает их согласно логике приложения.

RabbitMQ поддерживает несколько типов обменников (exchanges), которые определяют правила маршрутизации сообщений:

  • Direct - сообщение отправляется в очередь, которая соответствует ключу маршрутизации;
  • Fanout - сообщение рассылается во все очереди, связанные с этим обменником;
  • Topic - сообщение отправляется в очереди, соответствующие шаблону ключа маршрутизации;
  • Headers - маршрутизация основана на заголовках сообщения, а не на ключе.

Virtual Host — это изолированное пространство внутри RabbitMQ, которое позволяет разделять ресурсы (очереди, обменники, пользователи) между различными проектами или командами. Каждый Virtual Host имеет свои собственные очереди, обменники и разрешения.

Создать Virtual Host можно через консоль или веб-интерфейс. RabbitMQ предоставляет встроенный веб-интерфейс для мониторинга и управления брокером. Этот интерфейс называется RabbitMQ Management, который представляет собой плагин rabbitmq_management.

RabbitMQ Management по умолчанию доступен по адресу http://localhost:15672 под учётными данными guest/guest (логин/пароль). Интерфейс позволяет просматривать статистику (количество сообщений, скорость обработки, использование памяти), состояние очередей, обменников и соединений, создавать и удалять очереди, обменники.


Архитектура RabbitMQ

Базовые концепции модели обмена

Очереди, обменники и привязки

Очередь (queue) — буфер хранения сообщений с семантикой FIFO. Очередь принадлежит одному узлу кластера и может быть объявлена как постоянная (durable) для сохранения после перезапуска брокера.

Обменник (exchange) — точка маршрутизации, принимающая сообщения от продюсеров и распределяющая их по очередям согласно правилам привязки (bindings). Привязка определяет критерий маршрутизации: ключ маршрутизации (routing key) для direct/topic обменников или шаблон для headers.

Сообщение содержит:

  • Тело (payload) — произвольные байты
  • Свойства (properties): content_type, message_id, correlation_id, reply_to, expiration, priority
  • Флаг delivery_mode: 1 (volatile) или 2 (persistent) для надёжного хранения

Типы обменников и стратегии маршрутизации

  • Direct — точное совпадение ключа маршрутизации с привязкой. Используется для адресной доставки.
  • Fanout — рассылка во все привязанные очереди без учёта ключа. Реализует паттерн публикации-подписки.
  • Topic — шаблонное совпадение ключа (* — один сегмент, # — ноль или более сегментов). Пример: logs.error.# соответствует logs.error.api.
  • Headers — маршрутизация по совпадению заголовков сообщения (headers) с параметрами привязки. Поддерживает режимы any (логическое ИЛИ) и all (логическое И).
  • Default (амплуа) — скрытый direct-обменник с именем "", автоматически привязанный ко всем очередям по имени очереди как ключу.

Модель доставки и управление потоком

Продюсеры и потребители

Продюсер публикует сообщения в обменник, не зная конечных очередей. Потребитель (consumer) подписывается на очередь через метод basic.consume и получает сообщения асинхронно через обратный вызов.

Альтернативно используется синхронный метод basic.get для единичного извлечения.

Подтверждения и надёжность

  • Publisher confirms — механизм подтверждения доставки сообщения в очередь. Брокер отправляет basic.ack после сохранения сообщения на диск (для постоянных очередей) или в память.
  • Consumer acknowledgements — потребитель подтверждает обработку через basic.ack. При отказе обработки отправляется basic.nack или basic.reject с флагом requeue для возврата в очередь.
  • QoS prefetch — ограничение количества неподтверждённых сообщений на канал через basic.qos(prefetch_count=N). Предотвращает перегрузку потребителя.

Возврат сообщений и обработка ошибок

Сообщение возвращается продюсеру через basic.return, если:

  • Обменник не имеет подходящих привязок (режим mandatory=true)
  • Очередь переполнена (лимит длины или дискового пространства)
  • Превышено время жизни сообщения (TTL)

Кластеризация и отказоустойчивость

Архитектура кластера

Кластер объединяет несколько узлов (nodes) под единым пространством имён очередей и обменников. Все узлы имеют общее состояние метаданных через распределённую базу Mnesia.

Очереди физически размещаются на одном узле (дискретное размещение), но доступны для публикации/подписки со всех узлов кластера.

Репликация очередей

  • Mirrored queues (устаревший механизм) — репликация очередей через политики (policies) с указанием узлов-реплик. Управление лидером выполняется автоматически.
  • Quorum queues (рекомендуемый механизм) — реализация на основе алгоритма Raft. Обеспечивает строгую упорядоченность, отказоустойчивость и защиту от потери данных при разделении сети (split-brain protection). Требует нечётного числа реплик (3, 5, 7).

Пример политики для quorum-очередей:

{
"queue-master-locator": "client-local",
"dead-letter-exchange": "dlx.events"
}

Виртуальные хосты

Виртуальный хост (vhost) — изолированное пространство имён внутри кластера, содержащее собственные очереди, обменники и пользователей. Используется для разделения окружений (например, prod, staging, dev) или клиентов в мультитенантных системах.


Внутреннее устройство и соединения

Каналы и соединения

Соединение (connection) — TCP-соединение между клиентом и брокером. Канал (channel) — лёгковесный виртуальный канал поверх соединения, инкапсулирующий отдельный диалог. Один канал гарантирует упорядоченность операций; несколько каналов в одном соединении позволяют параллельную работу без создания множества TCP-соединений.

Хранение сообщений

Сообщения хранятся в файлах журналов на диске с индексами для быстрого доступа. Для постоянных очередей (durable=true) и сообщений (delivery_mode=2) данные сбрасываются на диск перед отправкой подтверждения. Временные очереди и сообщения хранятся только в памяти. Поддерживается сжатие журналов и фоновая дефрагментация.


Авторизация и управление доступом

Модель прав доступа

Доступ контролируется на уровне виртуального хоста через права:

  • configure — управление очередями/обменниками (объявление, удаление)
  • write — публикация в обменники
  • read — потребление из очередей

Права назначаются пользователю для конкретного vhost:

rabbitmqctl set_permissions -p /prod user1 ".*" ".*" ".*"

Аутентификация

Поддерживаются механизмы:

  • Встроенная аутентификация (логин/пароль в базе)
  • Внешняя аутентификация через плагины (LDAP, OAuth 2.0, JWT)
  • Аутентификация по сертификатам (TLS client certificate)

Программное управление через Management API

HTTP API предоставляет операции администрирования:

Управление очередями и обменниками

# Python: создание очереди через HTTP API
import requests
requests.put(
"http://admin:pass@localhost:15672/api/queues/%2F/orders",
json={
"durable": True,
"arguments": {
"x-queue-type": "quorum",
"x-dead-letter-exchange": "dlx.orders"
}
}
)

Мониторинг состояния

// C#: получение метрик очереди
var response = await httpClient.GetAsync(
"http://localhost:15672/api/queues/%2F/orders");
var metrics = await response.Content.ReadFromJsonAsync<QueueMetrics>();
// metrics.messages_ready, metrics.consumers, metrics.memory

Управление политиками

// Java: применение политики для dead-lettering
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.main");
args.put("x-message-ttl", 60000);
channel.queueDeclare("tasks", true, false, false, args);

Утилиты командной строки

Основные операции через rabbitmqctl

# Список узлов кластера
rabbitmqctl cluster_status

# Создание пользователя и прав
rabbitmqctl add_user app-user securepass
rabbitmqctl set_permissions -p /prod app-user ".*" ".*" ".*"

# Применение политики quorum-очередей
rabbitmqctl set_policy quorum-policy \
"^q\." '{"queue-mode":"quorum"}' --apply-to queues

# Просмотр сообщений в очереди (требует плагина shovel)
rabbitmqctl list_queues name messages_ready consumers

Управление через rabbitmqadmin (Python CLI)

# Экспорт конфигурации
rabbitmqadmin export config.json

# Импорт конфигурации
rabbitmqadmin import config.json

# Публикация тестового сообщения
rabbitmqadmin publish exchange=amq.default \
routing_key=debug payload="test message"

Настройка клиентских API

Producer pattern

Ключевые параметры подключения:

  • virtual_host — изолированное пространство имён
  • heartbeat — интервал проверки активности соединения (рекомендуется 60 сек)
  • connection_timeout — таймаут установки соединения
  • publisher_confirms — включение механизма подтверждений

Пример публикации с подтверждением (C#):

var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicPublish(
exchange: "orders",
routingKey: "order.created",
basicProperties: props,
body: body
);
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));

Consumer pattern

Рекомендуемая конфигурация:

  • prefetch_count=1..10 — баланс между пропускной способностью и справедливостью
  • auto_ack=false — ручное подтверждение после успешной обработки
  • consumer_tag — уникальный идентификатор потребителя для управления

Пример обработки с подтверждением (Python):

def callback(ch, method, properties, body):
try:
process(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False # отправка в DLX при настройке
)

channel.basic_qos(prefetch_count=3)
channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=False
)
channel.start_consuming()

Dead Letter Exchange (DLX)

Механизм обработки проваленных сообщений:

  1. Очередь объявляется с аргументами x-dead-letter-exchange и x-dead-letter-routing-key
  2. При отклонении (nack/reject с requeue=false) или истечении TTL сообщение перенаправляется в DLX
  3. DLX маршрутизирует сообщение в очередь мониторинга или повторных попыток

Установка и настройка

Как настроить RabbitMQ?

Erlang

Установка Erlang на сервер — это зависимость для RabbitMQ (он работает поверх Erlang). Пример:

sudo apt update
sudo apt install erlang

Установка RabbitMQ.

Сначала нужно добавить официальный репозиторий RabbitMQ:

sudo apt-get install curl gnupg
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo gpg --dearmor > /usr/share/keyrings/rabbitmq-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/rabbitmq-archive-keyring.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt update

А затем установить RabbitMQ:

sudo apt install rabbitmq-server

Запуск RabbitMQ

Для этого нужно запустить службу RabbitMQ:

sudo systemctl start rabbitmq-server

Желательно также включить автозапуск RabbitMQ при загрузке системы:

sudo systemctl enable rabbitmq-server

Чтобы убедиться в корректности запуска, можно проверить статус:

sudo systemctl status rabbitmq-server

Виртуальный хост

Создание виртуального хоста для изоляции проектов:

sudo rabbitmqctl add_vhost my_vhost

Настройка прав

Настройка прав доступа к виртуальному хосту.

Создание пользователя:

sudo rabbitmqctl add_user my_user my_password

Назначение прав на виртуальный хост:

sudo rabbitmqctl set_permissions -p my_vhost my_user ".*" ".*" ".*"

Включение плагина управления

Активировать веб-интерфейс:

sudo rabbitmq-plugins enable rabbitmq_management

Веб-интерфейс будет доступен по адресу http://localhost:15672

Подключение к RabbitMQ

В разных языках программирования используются соответствующие клиентские библиотеки. Нужно добавить к проекту программы библиотеку, а затем:

  • создать соединение с RabbitMQ, указав хост, порт и учётные данные;
  • создать канал (логическое соединение внутри физического соединения - все операции выполняются через канал);
  • создать обменник с указанием типа;
  • создать очередь с уникальным именем;
  • связать обменник с очередью, указав ключ маршрутизации.

C# - библиотека RabbitMQ.Client.

Пример использования:

using RabbitMQ.Client;
using System.Text;

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// Создание очереди
channel.QueueDeclare(queue: "my_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
// Отправка сообщения
string message = "Hello, RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "my_queue", basicProperties: null, body: body);
Console.WriteLine("Message sent");
}

Java - библиотека amqp-client.

Пример использования:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class RabbitMQExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Создание очереди
channel.queueDeclare("my_queue", false, false, false, null);
// Отправка сообщения
String message = "Hello, RabbitMQ!";
channel.basicPublish("", "my_queue", null, message.getBytes());
System.out.println("Message sent");
}
}
}

Python - библиотека pika.

Пример использования:

import pika

# Подключение к RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='my_queue')
# Отправка сообщения
channel.basic_publish(exchange='', routing_key='my_queue', body='Hello, RabbitMQ!')
print("Message sent")
connection.close()

JavaScript (Node.js) - библиотека amqplib.

Пример использования:

const amqp = require('amqplib');
async function sendMessage() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// Создание очереди
const queue = 'my_queue';
await channel.assertQueue(queue, { durable: false });
// Отправка сообщения
const message = 'Hello, RabbitMQ!';
channel.sendToQueue(queue, Buffer.from(message));
console.log("Message sent");
setTimeout(() => {
connection.close();
}, 500);
}
sendMessage();

PHP - библиотека php-amqplib.

Пример использования:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// Создание очереди
$channel->queue_declare('my_queue', false, false, false, false);
// Отправка сообщения
$msg = new AMQPMessage('Hello, RabbitMQ!');
$channel->basic_publish($msg, '', 'my_queue');
echo "Message sent\n";
$channel->close();
$connection->close();

Отправка и получение сообщений.

Отправка сообщения (Producer) выполняется через обменник.

Получение сообщения (Consumer) выполняется из очереди. Нужно подписаться на очередь и получить сообщение.

Архитектура RabbitMQ основана на гибкой модели маршрутизации через обменники и привязки, обеспечивающей декуплирование продюсеров и потребителей. Отказоустойчивость достигается через кластеризацию с репликацией очередей (quorum queues), а надёжность доставки — через подтверждения публикации и потребления. Эффективное использование требует правильной настройки QoS, управления временем жизни сообщений и применения паттернов обработки ошибок (DLX). Для продакшн-сред рекомендуется использовать quorum queues вместо mirrored, включать publisher confirms, настраивать мониторинг метрик (длина очередей, скорость обработки) и применять политики для автоматического управления параметрами очередей.

Мониторинг

В веб-интерфейсе можно анализировать список очередей и статистику, а в разделе Exchanges можно увидеть обменники и их привязки.

Масштабирование

Для масштабирования можно настроить кластер RabbitMQ. Но важно убедиться, что все узлы кластера имеют одинаковую версию RabbitMQ и Erlang.